package org.example.supervise.operationcenter.enterprisesecurity

import org.apache.spark.sql.{DataFrame, SparkSession}
import org.elasticsearch.index.query.QueryBuilders
import org.elasticsearch.search.builder.SearchSourceBuilder
import org.elasticsearch.spark.sql.EsSparkSQL
import org.example.client.DbClient
import org.example.common.Logging
import org.example.constant.ApolloConst
import org.example.utils.{CommonUtils, MysqlUtil}
import scalikejdbc.{NamedDB, SQL}

import java.sql.ResultSet

/**
 * ：企业数据相关录入率，在线率等详细表
 */
object EnterpriseDataAccess extends Logging{
  def getEnterpriseDataAccess2MysqlByDay(sparkSession: SparkSession): DataFrame = {
    //(1)、从业驾驶人数：企业在运政系统中的驾驶员人数
    //①根据车辆的实体类别区分企业的行业，运政企业信息和车辆基础信息通过业户ID关联提取客运、危货、普货企业的业户ID和业户名称
    val sql1 =
    """
      |select t2.industry_type,t1.business_owner_id,t1.business_owner_name
      |from
      |(select business_owner_id,business_owner_name from dwd.dwd_yz_company_info where business_owner_status = '营业') t1
      |inner join
      |(select '1011' industry_type,vehicle_id,business_owner_id from dwd.dwd_yz_vehicle_info where vecent_type = '客车' and operation_state = '营运'
      |union all
      |select '1010' industry_type,vehicle_id,business_owner_id from dwd.dwd_yz_vehicle_info where vecent_type = '危货' and operation_state = '营运'
      |union all
      |select '1009' industry_type,vehicle_id,business_owner_id from dwd.dwd_yz_vehicle_info where vecent_type = '普货' and operation_state = '营运') t2
      |on t1.business_owner_id=t2.business_owner_id
      |group by t2.industry_type,t1.business_owner_id,t1.business_owner_name
      |""".stripMargin

    val df1: DataFrame = sparkSession.sql(sql1).toDF("industry_type", "business_owner_id", "business_owner_name")
    df1.createOrReplaceTempView("yz_company_info")



    //②从业人员信息和从业资格证信息通过人员编号关联提取客运、危货、普货各企业的企业名称和人员编号
    sparkSession.sql(
      """
        |select b.industry_type,a.business_owner_name,employee_id from
        |(select employee_id,business_owner_name from dwd.dwd_yz_employee_info where business_owner_name!='') a
        |inner join
        |(select '1011' industry_type,person_number FROM dwd.dwd_yz_qualification_certificate_info where license_photo_status='正常' and license_expire_date>=current_date() and employ_certification_effective_date<=license_expire_date and qualification_certificate_type ='经营性道路旅客运输驾驶员'
        |union all
        |select '1010' industry_type,person_number FROM dwd.dwd_yz_qualification_certificate_info where license_photo_status='正常' and license_expire_date>=current_date() and employ_certification_effective_date<=license_expire_date and qualification_certificate_type in ('道路危险货物运输押运人员','道路危险货物运输驾驶员','剧毒化学品道路运输驾驶员','剧毒化学品道路运输押运人员','爆炸品道路运输押运人员','爆炸品道路运输驾驶员')
        |union all
        |select '1009' industry_type,person_number FROM dwd.dwd_yz_qualification_certificate_info where license_photo_status='正常' and license_expire_date>=current_date() and employ_certification_effective_date<=license_expire_date and qualification_certificate_type ='经营性道路货物运输驾驶员') b
        |on a.employee_id=b.person_number
        |""".stripMargin).toDF("industry_type", "business_owner_name", "employee_id")
      .createOrReplaceTempView("yz_drv_info")

    //③统计客运、危货、普货各企业在运政中的驾驶员人数
    sparkSession.sql(
      """
        |select t1.industry_type,t1.business_owner_id,t1.business_owner_name,count(distinct t2.employee_id) employed_drv_num
        |from yz_company_info  t1 inner join yz_drv_info t2 on t1.industry_type=t2.industry_type and t1.business_owner_name=t2.business_owner_name
        |group by t1.industry_type,t1.business_owner_id,t1.business_owner_name
        |""".stripMargin).toDF("industry_type", "business_owner_id", "business_owner_name", "employed_drv_num")
      .createOrReplaceTempView("yz_drv_num")


    //(2)、录入驾驶人数：企业从企业云端录入的驾驶员人数
    //a.录入驾驶员信息和企业信息管理通过企业编号关联，统计每个企业从企业云端录入的驾驶员数量
    val sql_drv =
    """
      |select (case when b.industry_type='6' then '1010' when b.industry_type='7' then '1011' end) industry_type,a.enterprise_code,a.enterprise_name,a.social_credit_code,
      |count(distinct b.qualification_certificate_number) enter_drv_num from zcov.base_into_enterprise_info a inner join zcov.base_transport_driver b
      |on a.enterprise_code=b.enterprise_code group by industry_type,a.enterprise_code,a.enterprise_name,a.social_credit_code
      |""".stripMargin
    val set_drv: ResultSet = MysqlUtil.getMysqlQueryResult(sql_drv)
    val DF_drv: DataFrame = MysqlUtil.resultSetToDataframe(set_drv, sparkSession).toDF("industry_type", "enterprise_code", "enterprise_name", "social_credit_code", "enter_drv_num")
    DF_drv.createOrReplaceTempView("ent_drv_num")

    //(3)、统计驾驶人录入率【驾驶人录入率=录入驾驶人数÷从业驾驶人数】
    sparkSession.sql(
      """
        |select industry_type,business_owner_id,social_credit_code,business_owner_name,'2' as index_type,
        |case when employed_drv_num is null or employed_drv_num=0 then 0 else enter_drv_num/employed_drv_num end as index_value
        |from
        |(select t1.industry_type,t1.business_owner_id,t1.business_owner_name,t1.employed_drv_num,t2.social_credit_code,t2.enter_drv_num
        |from yz_drv_num t1 left join ent_drv_num t2 on t1.industry_type=t2.industry_type and t1.business_owner_name=t2.enterprise_name) a
        |""".stripMargin).toDF("industry_type", "business_owner_id", "social_credit_code", "business_owner_name", "index_type", "index_value")
      .createOrReplaceTempView("drv_index_value")


    //2、车辆录入率：
    //-----(1)统计营运车辆数：企业在运政系统中备案的车辆数
    //------①从运政车辆信息提取实体类别in['客车','危货','普货']的车辆ID和业户ID信息
    sparkSession.sql(
      """
        |select '1011' industry_type,vehicle_id,business_owner_id from dwd.dwd_yz_vehicle_info where vecent_type = '客车' and operation_state = '营运'
        |union all
        |select '1010' industry_type,vehicle_id,business_owner_id from dwd.dwd_yz_vehicle_info where vecent_type = '危货' and operation_state = '营运'
        |union all
        |select '1009' industry_type,vehicle_id,business_owner_id from dwd.dwd_yz_vehicle_info where vecent_type = '普货' and operation_state = '营运'
        |""".stripMargin).toDF("industry_type", "vehicle_id", "business_owner_id")
      .createOrReplaceTempView("yz_vec_info_t")

    //-----(2)统计录入车辆数：企业从企业云端录入的车辆数(此步只提取车辆明细信息)
    //------①接入车辆信息管理和企业信息管理通过企业编号关联，提取每个企业从企业云端录入的车辆信息
    sparkSession.sql(
      """
        |select t2.industry_type,t1.business_owner_id,t1.business_owner_name,count(distinct t2.vehicle_id) as operation_vec_num from
        |(select business_owner_id,business_owner_name from dwd.dwd_yz_company_info where business_owner_status = '营业') t1
        |inner join yz_vec_info_t t2 on t1.business_owner_id=t2.business_owner_id
        |group by t2.industry_type,t1.business_owner_id,t1.business_owner_name
        |""".stripMargin).toDF("industry_type", "business_owner_id", "business_owner_name", "operation_vec_num")
      .createOrReplaceTempView("yz_vec_num")

    //-----(2)统计录入车辆数：企业从企业云端录入的车辆数(此步只提取车辆明细信息)
    //------①接入车辆信息管理和企业信息管理通过企业编号关联，提取每个企业从企业云端录入的车辆信息
    val sql_veh =
    """
      |select a.enterprise_code,a.enterprise_name,a.social_credit_code,b.plate_num,b.plate_color
      |from zcov.base_into_enterprise_info a
      |inner join
      |(select enterprise_code,plate_num,
      |(case when plate_color='1' then '蓝色'
      |when plate_color='2' then '黄色'
      |when plate_color='3' then '黑色'
      |when plate_color='4' then '白色'
      |when plate_color='5' then '绿色'
      |when plate_color='9' then '其它'
      |when plate_color='96' then '渐变绿色'
      |when plate_color='97' then '黄绿双拼色' end ) plate_color
      |from zcov.base_into_vehicle_info) b
      |on a.enterprise_code=b.enterprise_code
      |group by a.enterprise_code,a.enterprise_name,a.social_credit_code,b.plate_num,b.plate_color
      |""".stripMargin
    val set_veh: ResultSet = MysqlUtil.getMysqlQueryResult(sql_veh)
    val DF_veh: DataFrame = MysqlUtil.resultSetToDataframe(set_veh, sparkSession).toDF("enterprise_code", "enterprise_name", "social_credit_code", "plate_num", "plate_color")
    DF_veh.createOrReplaceTempView("ent_vec_info")

    //-----(3)、统计车辆录入率【车辆录入率=录入车辆数÷营运车辆数】

    val dr_1: DataFrame = sparkSession.sql(
      """
        |select industry_type,business_owner_id,social_credit_code,business_owner_name,'1' as index_type,
        |case when operation_vec_num is null or operation_vec_num=0 then 0 else enter_vec_num/operation_vec_num end as index_value
        |from
        |(select t1.industry_type,t1.business_owner_id,t1.business_owner_name,t1.operation_vec_num,t2.social_credit_code,t2.enter_vec_num
        |from yz_vec_num t1
        |left join
        |(select b.industry_type,b.business_owner_id,a.enterprise_name,a.social_credit_code,count(distinct a.plate_num,a.plate_color) enter_vec_num
        |from ent_vec_info a
        |inner join
        |(select business_owner_id,business_owner_name from dwd.dwd_yz_company_info where business_owner_status = '营业') c
        |on a.enterprise_name=c.business_owner_name
        |inner join yz_vec_info_t b
        |on b.business_owner_id=c.business_owner_id
        |group by b.industry_type,b.business_owner_id,a.enterprise_name,a.social_credit_code) t2
        |on t1.industry_type=t2.industry_type and t1.business_owner_name=t2.enterprise_name) ee
        |where ee.operation_vec_num>0
        |""".stripMargin).toDF("industry_type", "business_owner_id", "social_credit_code", "business_owner_name", "index_type", "index_value")
    dr_1.show(3)
    dr_1.createOrReplaceTempView("veh_index_value")


    //3、车辆在线率【车辆在线率=在线车辆数/企业录入车辆数】
    //-----(1)、统计今日在线车辆数【今日截至目前至少向上级平台传输一次动态数据的累计重点营运车辆数】
    //------①从运政车辆信息提取实体类别in['客车','危货','普货']的车辆ID和业户ID信息
    sparkSession.sql(
      """
        |select '1011' industry_type,vehicle_id,business_owner_id,vehicle_number,vehicle_plate_color from dwd.dwd_yz_vehicle_info where vecent_type = '客车' and operation_state = '营运'
        |union all
        |select '1010' industry_type,vehicle_id,business_owner_id,vehicle_number,vehicle_plate_color from dwd.dwd_yz_vehicle_info where vecent_type = '危货' and operation_state = '营运'
        |union all
        |select '1009' industry_type,vehicle_id,business_owner_id,vehicle_number,vehicle_plate_color from dwd.dwd_yz_vehicle_info where vecent_type = '普货' and operation_state = '营运'
        |""".stripMargin).toDF("industry_type", "vehicle_id", "business_owner_id", "vehicle_number", "vehicle_plate_color")
      .createOrReplaceTempView("yz_vec_info")

    //------②统计今日在线车辆数
    sparkSession.sql(
      """
        |select t1.industry_type,t3.business_owner_id,t3.business_owner_name,count(distinct t2.vehicleno,t2.vehiclecolor) online_vec_num from yz_vec_info t1
        |inner join
        |(select vehicleno,(case when vehiclecolor='1'  then '蓝色' when  vehiclecolor='2'  then '黄色'  when vehiclecolor='3'  then '黑色'  when  vehiclecolor='4'  then '白色' when  vehiclecolor='5'  then '绿色' when  vehiclecolor='9'  then '其它' when  vehiclecolor='96'  then '渐变绿色' when  vehiclecolor='97'  then '黄绿双拼色'  end ) vehiclecolor from dwd.dwd_up_exg_msg_real_location where substr(datetime,1,10)=current_date() group by vehicleno,vehiclecolor union all select vehicleno,(case when vehiclecolor='1'  then '蓝色' when  vehiclecolor='2'  then '黄色'  when vehiclecolor='3'  then '黑色'  when  vehiclecolor='4'  then '白色' when  vehiclecolor='5'  then '绿色' when  vehiclecolor='9'  then '其它' when  vehiclecolor='96'  then '渐变绿色' when  vehiclecolor='97'  then '黄绿双拼色'  end ) vehiclecolor from ods.kafka_up_warn_msg_adpt_info_i where substr(warntime,1,10)=current_date() group by vehicleno,vehiclecolor) t2
        |on t1.vehicle_number=t2.vehicleno and t1.vehicle_plate_color=t2.vehiclecolor
        |inner join
        |(select business_owner_id,business_owner_name from dwd.dwd_yz_company_info where business_owner_status = '营业') t3
        |on t1.business_owner_id=t3.business_owner_id group by t1.industry_type,t3.business_owner_id,t3.business_owner_name
        |""".stripMargin).toDF("industry_type", "business_owner_id", "business_owner_name", "online_vec_num")
      .createOrReplaceTempView("online_veh_num")

    //------(2)、统计录入车辆数：企业从企业云端录入的车辆数(此步只提取车辆明细信息)
    //------①接入车辆信息管理和企业信息管理通过企业编号关联，提取每个企业从企业云端录入的车辆信息
    //该功能点使用ent_veh_num表
    //------(3)、统计车辆在线率【车辆在线率=在线车辆数/企业录入车辆数】
    sparkSession.sql(
      """
        |select t2.industry_type,t2.business_owner_id,t1.business_owner_name,t1.online_vec_num,t2.social_credit_code,t2.enter_vec_num
        |from online_veh_num t1
        |right join
        |(select b.industry_type,b.business_owner_id,a.enterprise_name,a.social_credit_code,count(distinct a.plate_num,a.plate_color) enter_vec_num
        |from ent_vec_info a
        |inner join yz_vec_info b
        |on a.plate_num=b.vehicle_number and a.plate_color=b.vehicle_plate_color
        |inner join (select business_owner_id,business_owner_name from dwd.dwd_yz_company_info where business_owner_status = '营业') c
        |on b.business_owner_id=c.business_owner_id
        |group by b.industry_type,b.business_owner_id,a.enterprise_name,a.social_credit_code) t2
        |on t1.industry_type=t2.industry_type and t1.business_owner_id=t2.business_owner_id
        |""".stripMargin).toDF("industry_type", "business_owner_id", "business_owner_name", "online_vec_num", "social_credit_code", "enter_vec_num").
      createOrReplaceTempView("Online_ss")


    sparkSession.sql(
      """
        |select industry_type,business_owner_id,social_credit_code,business_owner_name,'3' as index_type,
        |case when enter_vec_num is null or enter_vec_num=0 then 0 else online_vec_num/enter_vec_num end as  index_value from Online_ss
        |""".stripMargin).toDF("industry_type", "business_owner_id", "social_credit_code", "business_owner_name", "index_type", "index_value")
      .createOrReplaceTempView("online_index_value")

    //4、图片数据完整率
    //------①从运政车辆信息提取实体类别in['客车','危货','普货']的车辆ID和业户ID信息
    //该功能点使用yz_vec_info_online表即可
    //------②报警数量：报警明细数据和企业信息管理表通过企业编号关联提取报警和企业信息,然后再和运政车辆信息、运政企业信息分别通过车牌号、车牌颜色和企业名称关联统计今日总报警数量

    var builder: SearchSourceBuilder = new SearchSourceBuilder() //用于查询的对象
    //val queryBuilder = QueryBuilders.boolQuery() //添加查询规则
    val queryBuilder = QueryBuilders.matchAllQuery() //不限制条件查询
    builder.query(queryBuilder) //查询规则加到查询对象中
    val esQuery = builder.toString //将查询对象转为string格式
    val frame_Risk: DataFrame = EsSparkSQL
      .esDF(sparkSession, "warninfo_index/_doc", esQuery)
      .select("vehicleColor", "vehicleEnterpriseCode", "vehicleNo", "datetime", "primaryKey")
      .toDF("vehicleColor", "vehicleEnterpriseCode", "vehicleNo", "datetime", "primaryKey")
    frame_Risk.show(3, false)
    frame_Risk.createOrReplaceTempView("warninfo_index")


    val sql_1 =
      """
        |select enterprise_name,enterprise_code,social_credit_code from zcov.base_into_enterprise_info
        |group by enterprise_name,social_credit_code
        |""".stripMargin
    val set_qy: ResultSet = MysqlUtil.getMysqlQueryResult(sql_1)
    val DF_qy: DataFrame = MysqlUtil.resultSetToDataframe(set_qy, sparkSession).toDF("enterprise_name", "enterprise_code", "social_credit_code")
    DF_qy.createOrReplaceTempView("base_into_enterprise_info")
    sparkSession.sql(
      """
        |SELECT t2.industry_type,
        |t2.business_owner_id,
        |t2.business_owner_name,
        |t1.social_credit_code,
        |count(distinct t1.primaryKey) warn_num
        |FROM
        |(
        |SELECT a.vehicleColor,
        |a.vehicleEnterpriseCode,
        |a.vehicleNo,
        |a.primaryKey,
        |b.enterprise_name,
        |b.social_credit_code
        |FROM
        |(
        |SELECT
        |case
        |WHEN vehicleColor='1' THEN
        |'蓝色'
        |WHEN vehicleColor='2' THEN
        |'黄色'
        |WHEN vehicleColor='3' THEN
        |'黑色'
        |WHEN vehicleColor='4' THEN
        |'白色'
        |WHEN vehicleColor='5' THEN
        |'绿色'
        |WHEN vehicleColor='9' THEN
        |'其它'
        |WHEN vehicleColor='96' THEN
        |'渐变绿色'
        |WHEN vehicleColor='97' THEN
        |'黄绿双拼色'
        |END as  vehicleColor,
        |vehicleEnterpriseCode,
        |vehicleNo,
        |primaryKey
        |FROM warninfo_index
        |WHERE from_unixtime(datetime,'yyyy-MM-dd')=current_date()
        |) a
        |INNER JOIN base_into_enterprise_info b
        |ON a.vehicleEnterpriseCode=b.enterprise_code
        |) t1
        |RIGHT JOIN
        |(
        |SELECT a.industry_type,
        |a.vehicle_id,
        |a.business_owner_id,
        |a.vehicle_number,
        |a.vehicle_plate_color,
        |b.business_owner_name
        |FROM yz_vec_info a
        |INNER JOIN dwd.dwd_yz_company_info b
        |ON a.business_owner_id=b.business_owner_id
        |WHERE b.business_owner_status = '营业'
        |) t2
        |ON t1.vehicleNo=t2.vehicle_number AND t1.vehicleColor=t2.vehicle_plate_color
        |GROUP BY  t2.industry_type,t2.business_owner_id,t2.business_owner_name,t1.social_credit_code
        """.stripMargin).toDF("industry_type", "business_owner_id", "business_owner_name", "social_credit_code", "warn_num")
      .createOrReplaceTempView("warn_num")

    //------③图片报警数量：报警图片信息和运政车辆信息、运政企业信息分别通过车牌号、车牌颜色和企业id关联统计今日图片报警数量
    val frame_warn: DataFrame = EsSparkSQL
      .esDF(sparkSession, "warn_attach_index/_doc", esQuery)
      .select("vehicleNo", "vehicleColor", "INFO_ID", "warnTime", "filePath")
      .toDF("vehicleNo", "vehicleColor", "INFO_ID", "warnTime", "filePath")
    frame_warn.show(3, false)
    frame_warn.createOrReplaceTempView("warn_attach_index")
    sparkSession.sql(
      """
        |SELECT t2.industry_type,
        |t2.business_owner_id,
        |t2.business_owner_name,
        |count(distinct t1.INFO_ID) pic_warn_num
        |FROM (
        |select
        |vehicleNo,
        |CASE
        |WHEN vehicleColor='1' THEN
        |'蓝色'
        |WHEN vehicleColor='2' THEN
        |'黄色'
        |WHEN vehicleColor='3' THEN
        |'黑色'
        |WHEN vehicleColor='4' THEN
        |'白色'
        |WHEN vehicleColor='5' THEN
        |'绿色'
        |WHEN vehicleColor='9' THEN
        |'其它'
        |WHEN vehicleColor='96' THEN
        |'渐变绿色'
        |WHEN vehicleColor='97' THEN
        |'黄绿双拼色'
        |END as vehicleColor,
        |INFO_ID
        |FROM warn_attach_index
        |WHERE filePath LIKE '%.gif'
        |AND from_unixtime(warnTime,'yyyy-MM-dd')=current_date()
        |) t1
        |RIGHT JOIN
        |(
        |SELECT a.industry_type,
        |vehicle_id,
        |a.business_owner_id,
        |vehicle_number,
        |vehicle_plate_color,
        |b.business_owner_name
        |FROM yz_vec_info a
        |INNER JOIN dwd.dwd_yz_company_info b
        |ON a.business_owner_id=b.business_owner_id
        |WHERE b.business_owner_status = '营业'
        |) t2
        |ON t1.vehicleNo=t2.vehicle_number AND t1.vehicleColor=t2.vehicle_plate_color
        |GROUP BY  t2.industry_type,t2.business_owner_id,t2.business_owner_name
            """.stripMargin).createOrReplaceTempView("pic_warn_num")
    //    //------④统计图片数据完整率：图片报警数量/报警数量*100%（今日）
    sparkSession.sql(
      """SELECT a.industry_type,
        |a.business_owner_id,
        |a.social_credit_code,
        |a.business_owner_name,
        |'4' AS index_type,
        |case when warn_num is null or warn_num=0 then 0 else pic_warn_num/warn_num end as index_value
        |FROM warn_num a
        |INNER JOIN pic_warn_num b
        |ON a.industry_type=b.industry_type AND a.business_owner_id=b.business_owner_id
            """.stripMargin).toDF("industry_type", "business_owner_id", "social_credit_code", "business_owner_name", "index_type", "index_value")
      .createOrReplaceTempView("pic_index_value")

    //5、视频数据完整率
    //------①从运政车辆信息提取实体类别in['客车','危货','普货']的车辆ID和业户ID信息

    //------②报警数量：报警明细数据和企业信息管理表通过企业编号关联提取报警和企业信息,然后再和运政车辆信息、运政企业信息分别通过车牌号、车牌颜色和企业名称关联统计今日总报警数量

    //③视频报警数量：报警图片信息和运政车辆信息、运政企业信息分别通过车牌号、车牌颜色和企业id关联统计今日图片报警数量
    sparkSession.sql(
      """
        |SELECT t2.industry_type,
        |t2.business_owner_id,
        |t2.business_owner_name,
        |count(distinct t1.INFO_ID) pic_warn_num
        |FROM
        |(
        |SELECT vehicleNo,
        |(case
        |WHEN vehicleColor='1' THEN
        |'蓝色'
        |WHEN vehicleColor='2' THEN
        |'黄色'
        |WHEN vehicleColor='3' THEN
        |'黑色'
        |WHEN vehicleColor='4' THEN
        |'白色'
        |WHEN vehicleColor='5' THEN
        |'绿色'
        |WHEN vehicleColor='9' THEN
        |'其它'
        |WHEN vehicleColor='96' THEN
        |'渐变绿色'
        |WHEN vehicleColor='97' THEN
        |'黄绿双拼色'
        |END ) vehicleColor,
        |INFO_ID
        |FROM warn_attach_index
        |WHERE filePath LIKE '%.mp4'
        |AND from_unixtime(warnTime,'yyyy-MM-dd')=current_date()
        |) t1 ------报警视频信息：warn_attach_index（该表位于es中，需要将es转换成SQL语句）
        |RIGHT JOIN
        |(
        |SELECT a.industry_type,
        |vehicle_id,
        |a.business_owner_id,
        |vehicle_number,
        |vehicle_plate_color,
        |b.business_owner_name
        |FROM yz_vec_info a
        |INNER JOIN dwd.dwd_yz_company_info b
        |ON a.business_owner_id=b.business_owner_id
        |WHERE b.business_owner_status = '营业'
        |) t2
        |ON t1.vehicleNo=t2.vehicle_number
        |AND t1.vehicleColor=t2.vehicle_plate_color
        |GROUP BY  t2.industry_type,t2.business_owner_id,t2.business_owner_name
            """.stripMargin).createOrReplaceTempView("sp_warn_num")
    //------④统计视频数据完整率：视频报警数量/报警数量*100%（今日）
    sparkSession.sql(
      """
        |SELECT a.industry_type,
        |a.business_owner_id,
        |a.social_credit_code,
        |a.business_owner_name,
        |'5' AS index_type,
        |case when warn_num is null or warn_num=0 then 0 else pic_warn_num/warn_num end as index_value
        |FROM warn_num a
        |INNER JOIN pic_warn_num b
        |ON a.industry_type=b.industry_type
        |AND a.business_owner_id=b.business_owner_id
        |""".stripMargin).toDF("industry_type", "business_owner_id", "social_credit_code", "business_owner_name", "index_type", "index_value")
      .createOrReplaceTempView("sp_index_value")
    //汇总统计
    val sql_Res = sparkSession.sql(
      """
            select industry_type,business_owner_id,business_owner_name,social_credit_code,index_type,concat(round(case when index_value is null then 0 else index_value end,4)*100,'%') as index_value,date_format(current_timestamp(),'yyyy-MM-dd HH:mm:ss') as gmt_modified from drv_index_value t1
        |union all
        |select industry_type,business_owner_id,business_owner_name,social_credit_code,index_type,concat(round(case when index_value is null then 0 else index_value end,4)*100,'%') as index_value,date_format(current_timestamp(),'yyyy-MM-dd HH:mm:ss') as gmt_modified from veh_index_value t2
        |union all
        |select industry_type,business_owner_id,business_owner_name,social_credit_code,index_type,concat(round(case when index_value is null then 0 else index_value end,4)*100,'%') as index_value,date_format(current_timestamp(),'yyyy-MM-dd HH:mm:ss') as gmt_modified from  online_index_value t3
        |union all
        |select industry_type,business_owner_id,business_owner_name,social_credit_code,index_type,concat(round(case when index_value is null then 0 else index_value end,4)*100,'%') as index_value,date_format(current_timestamp(),'yyyy-MM-dd HH:mm:ss') as gmt_modified from  pic_index_value t4
        |union all
        |select industry_type,business_owner_id,business_owner_name,social_credit_code,index_type,concat(round(case when index_value is null then 0 else index_value end,4)*100,'%') as index_value,date_format(current_timestamp(),'yyyy-MM-dd HH:mm:ss') as gmt_modified from sp_index_value t5
        |""".stripMargin)
    sql_Res
  }
  def main(args: Array[String]): Unit = {
    val sparkSession = CommonUtils.getSparkSession()
    val sql_Res: DataFrame = getEnterpriseDataAccess2MysqlByDay(sparkSession)
    sql_Res.foreach{row=>
      val poolName="lkyw_data_into_info"
      DbClient.init(poolName,
        ApolloConst.jgdMysqlDriver, ApolloConst.jgdMysqlURL,
        ApolloConst.jgdMysqlUserName,
        ApolloConst.jgdMysqlPassWord)
      DbClient.usingDB(poolName){ db:NamedDB=>
        val sqlStr =
          s"""
             |replace into zcov.lkyw_data_into_info(enterprise_code,social_credit_code,enterprise_name,belong_industry,index_type,index_value,gmt_modified) values(?,?,?,?,?,?,?)
             |""".stripMargin
        db autoCommit({ implicit session=>
          SQL(sqlStr).bind(row.getString(1), row.getString(3), row.getString(2), row.getString(0), row.getString(4), row.getString(5),row.getString(6)).update().apply()
        })
      }
    }
    sparkSession.catalog.dropTempView("ent_drv_num")
    sparkSession.catalog.dropTempView("yz_company_info")
    sparkSession.catalog.dropTempView("yz_drv_num")
    sparkSession.catalog.dropTempView("drv_index_value")
    sparkSession.catalog.dropTempView("yz_vec_info")
    sparkSession.catalog.dropTempView("yz_vec_num")
    sparkSession.catalog.dropTempView("ent_veh_num")
    sparkSession.catalog.dropTempView("veh_index_value")
    sparkSession.catalog.dropTempView("yz_vec_info_t")
    sparkSession.catalog.dropTempView("online_veh_num")
    sparkSession.catalog.dropTempView("online_index_value")
    sparkSession.catalog.dropTempView("warninfo_index")
    sparkSession.catalog.dropTempView("base_into_enterprise_info")
    sparkSession.catalog.dropTempView("warn_num")
    sparkSession.catalog.dropTempView("warn_attach_index")
    sparkSession.catalog.dropTempView("pic_warn_num")
    sparkSession.catalog.dropTempView("pic_index_value")
    sparkSession.catalog.dropTempView("sp_warn_num")
    sparkSession.catalog.dropTempView("sp_index_value")
    sparkSession.close()
  }
}
